package cn.doitedu.datayi.etl

import ch.hsr.geohash.GeoHash
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.lionsoul.ip2region.{DataBlock, DbConfig, DbSearcher}

import java.net.URI
import java.text.{DateFormat, SimpleDateFormat}
import java.util.{Date, UUID}

/**
 * @author 涛哥
 * @nick_name "deep as the sea"
 * @contact qq:657270652 wx:doit_edu
 * @site www.doitedu.cn
 * @date 2021-09-22
 * @desc ETL广义上就是一个数据处理流程： 抽取数据，转换数据，加载到目标存储
 *
 *       狭义上的ETL指的是传统数仓领域中，一种岗位（ETL工程师）和技能！用 kettel 等工具连接公司的各种业务库（mysql，oracle），在各种库之间进行数据的加工、迁移
 *       这种加工、迁移，往往涉及到3个流程：  对接源库表抽取数据，进行各种加工运算，结果load到目标库表！
 */
object AppLogOdsDwdEtl {

  def main(args: Array[String]): Unit = {

    if(args.size != 2){
      println(
        """
          |
          |error:arguments size incorrect!
          |usage:
          |  args(0) : 计算的数据日期
          |  args(1) : ip地理位置数据库文件路径
          |
          |""".stripMargin)
      sys.exit(1)
    }

    // 获取传入的参数
    val dt: String = args(0)
    val ip2regionPath: String = args(1)

    // 构造spark会话实例
    val spark: SparkSession = SparkSession.builder()
      .appName("app日志etl到dwd")
      // .config("spark.sql.shuffle.partitions", 2)
      .enableHiveSupport()
      //.master("local")
      .getOrCreate()

    import spark.implicits._

    // 读数据
    val applog: Dataset[Row] = spark.sql(
      s"""
        |
        |select
        |
        | account                   ,
        | appid                     ,
        | appversion                ,
        | carrier                   ,
        | deviceid  as device_id    ,
        | devicetype                ,
        | eventid                   ,
        | ip                        ,
        | latitude                  ,
        | longitude                 ,
        | nettype                   ,
        | osname                    ,
        | osversion                 ,
        | properties                ,
        | releasechannel            ,
        | resolution                ,
        | sessionid                 ,
        | timestamp                 ,
        | '' as splitedSessionId    ,
        | '' as prvoince            ,
        | '' as city                ,
        | '' as region              ,
        | -1 as guid
        |from ods.mall_app_action_log
        |where dt='${dt}'
        |
        |""".stripMargin)


    // 转成rdd
    val applogRdd: RDD[Applog] = applog.as[Applog].rdd

    // 清洗过滤
    //1，过滤掉日志中缺少关键字段（deviceid/properties/eventid/sessionid 缺任何一个都不行）的记录！
    //2，过滤掉日志中不符合时间段的记录（由于app上报日志可能的延迟，有数据延迟到达）
    val sdf = new SimpleDateFormat("yyyy-MM-dd")
    val dayStart: Long = sdf.parse(s"${dt}").getTime
    val dayEnd: Long = dayStart + 24*60*60*1000

    val filtered = applogRdd.filter(applog => {
      StringUtils.isNotBlank(applog.device_id) &&
        StringUtils.isNotBlank(applog.eventid) &&
        StringUtils.isNotBlank(applog.sessionid) &&
        applog.properties != null &&
        applog.timestamp >= dayStart &&
        applog.timestamp < dayEnd
    })

    // 数据规范化
    val normed = filtered.map(applog => {
      if (applog.account.trim == "") applog.account = null
      applog
    })


    // session分割 -- 为什么要session分割
    /**
     * 点赞,u01,s01,t01 ,ns01
     * 收藏,u01,s01,t02 ,ns01
     * 收藏,u01,s01,t08 ,ns02
     * 收藏,u01,s01,t09 ,ns02
     * 收藏,u01,s01,t10 ,ns02
     * 分享,u01,s01,t11 ,ns02
     */
    val sessionSplited: RDD[Applog] = normed.groupBy(_.sessionid)
      .flatMap(tp => {
        val originSession: List[Applog] = tp._2.toList.sortBy(_.timestamp)

        // 先生成一个新sessionid
        var splitedSessionId: String = UUID.randomUUID().toString

        for (i <- 0 until originSession.size) {
          originSession(i).splitedSessionId = splitedSessionId
          // 判断是否要更新splitedSessionid
          if (i < originSession.size - 1 && originSession(i + 1).timestamp - originSession(i).timestamp > 30 * 60 * 1000)
            splitedSessionId = UUID.randomUUID().toString
        }

        originSession
      })

    // 数据（地理位置）集成

    // 加载hive中的地理位置维表，并广播
    val geoHashTable: DataFrame = spark.read.table("dim.geohash_area").where("geohash is not null")
    val geohashMap = geoHashTable.collect().map({
      case Row(geohash: String, province: String, city: String, region: String) =>
        (geohash, (province, city, region))
    }).toMap
    val bc1 = spark.sparkContext.broadcast(geohashMap)

    // 构造一个hdfs的客户端
    val conf = new Configuration()
    conf.set("fs.defaultFS", "hdfs://doit01:8020")
    val fs: FileSystem = FileSystem.get(conf)
    val path = new Path(ip2regionPath)
    val fileLen = fs.listStatus(path)(0).getLen

    // 读取hdfs上的库文件，装入一个字节数组
    val in: FSDataInputStream = fs.open(path)
    val bytes = new Array[Byte](fileLen.toInt)
    IOUtils.readFully(in, bytes)

    // 广播ip2region文件的字节数组
    val bc2 = spark.sparkContext.broadcast(bytes)


    // 地理位置信息查询集成逻辑所在
    val aread = sessionSplited.map(applog => {
      val lat: Double = applog.latitude
      val lng: Double = applog.longitude

      var (province, city, region) = ("未知省", "未知市", "未知区")

      // 从广播变量中取到geohash参考点知识字典
      try {
        val geohashDict: Map[String, (String, String, String)] = bc1.value

        // 用日志数据中的经纬度坐标转成geohash
        val geoHashcode: String = GeoHash.geoHashStringWithCharacterPrecision(lat, lng, 6)

        // 用geohash码，去参考点知识字典中查询对应的省市区信息
        val tp = geohashDict.getOrElse(geoHashcode, ("未知省", "未知市", "未知区"))

        // 将查询到的结果给临时变量赋值
        province = tp._1
        city = tp._2
        region = tp._3

      } catch {
        case e: Exception => e.printStackTrace()
      }

      // 如果用gps坐标查询失败，则再用ip地址查询一次
      if (province.equals("未知省")) {
        val searcher = new DbSearcher(new DbConfig(), bc2.value)
        val block: DataBlock = searcher.memorySearch(applog.ip)
        // 中国|0|上海|上海|电信
        val strings: Array[String] = block.getRegion.split("\\|")
        if (strings(0).equals("中国")) {
          province = strings(2)
          city = strings(3)
        }
      }

      // 将查询到的省市区，设置到applog中，并返回
      applog.prvoince = province
      applog.city = city
      applog.region = region

      // 返回更新后的applog
      applog
    })


    // 全局唯一标识
    // 主体逻辑： 1. 如果日志中有account，则用account去用户注册信息表中查找对应的guid
    //          2. 如果日志中没有account,则用deviceid去“绑定评分表”查找绑定的account，进而查找对应的guid
    //          3. 如果日志中没有account，也没有绑定的account，则从“空设备id映射表”去查该设备id对应的guid

    // 加载 商城注册用户信息表,并收集到driver端，然后广播
    val mallUser: Dataset[Row] = spark.read.table("dwd.mall_member").where(s"dt='${dt}'")
    val mallUserQueryMap = mallUser.rdd.map({
      case Row(id: Long, account: String, _ ) => (account, id)
    }).collectAsMap()
    val bc3  = spark.sparkContext.broadcast(mallUserQueryMap)


    // 1.对日志中没有账号的数据，根据 账号绑定表，填补账号
    aread.toDS().createTempView("tmp1")
    val accountBinded = spark.sql(
      s"""
        |with tmp2 as (
        | select device_id,account from  dws.mall_app_dev_bind_f where dt='${dt}'
        |)
        |select
        |nvl(tmp1.account,tmp2.account) as account,
        |tmp1.appid               ,
        |tmp1.appversion          ,
        |tmp1.carrier             ,
        |tmp1.device_id            ,
        |tmp1.devicetype          ,
        |tmp1.eventid             ,
        |tmp1.ip                  ,
        |tmp1.latitude            ,
        |tmp1.longitude           ,
        |tmp1.nettype             ,
        |tmp1.osname              ,
        |tmp1.osversion           ,
        |tmp1.properties          ,
        |tmp1.releasechannel      ,
        |tmp1.resolution          ,
        |tmp1.sessionid           ,
        |tmp1.timestamp           ,
        |tmp1.splitedSessionId    ,
        |tmp1.prvoince            ,
        |tmp1.city                ,
        |tmp1.region              ,
        |tmp1.guid
        |
        |from tmp1   LEFT JOIN  tmp2 ON tmp1.device_id=tmp2.device_id
        |
        |
        |""".stripMargin)


    // 2. 如果日志中有account，则用account去用户注册信息表中查找对应的guid
    val accountGuided: RDD[Applog] = accountBinded.as[Applog].rdd.map(applog=>{

      var guid:Long = -1
      val account: String = applog.account

      // 用account去用户注册表中查询
      if(StringUtils.isNotBlank(account)) {
        val mallUsers: collection.Map[String, Long] = bc3.value
        guid = mallUsers.getOrElse(account, -1)
      }
      applog.guid = guid
      applog
    })


    // 通过account查找guid成功的数据
    val accountGuidedNotNull = accountGuided.filter(_.guid != -1).toDS()

    // 通过account查找guid失败的数据，注册成一个视图
    val accountGuiedNull: RDD[Applog] = accountGuided.filter(_.guid == -1)
    accountGuiedNull.toDS().createTempView("account_guid_null")

    // 通过空设备id获取对应的guid
    val deviceGuidedNotNull = spark.sql(
      s"""
        |with idmp as (
        |  select  device_id, guid from dws.device_guid_idmp where dt='${dt}'
        |)
        |select
        |  a.account
        |  ,a.appid
        |  ,a.appversion
        |  ,a.carrier
        |  ,a.device_id
        |  ,a.devicetype
        |  ,a.eventid
        |  ,a.ip
        |  ,a.latitude
        |  ,a.longitude
        |  ,a.nettype
        |  ,a.osname
        |  ,a.osversion
        |  ,a.properties
        |  ,a.releasechannel
        |  ,a.resolution
        |  ,a.sessionid
        |  ,a.timestamp
        |  ,a.splitedSessionId
        |  ,a.prvoince
        |  ,a.city
        |  ,a.region
        |  ,idmp.guid
        |  -- , '2021-09-18'  as dt
        |from account_guid_null a left join idmp on a.device_id=idmp.device_id
        |
        |""".stripMargin).as[Applog]



    // 4. 把通过账号找到了guid的数据  UNION ALL  通过空设备id找到了guid的数据
    val res = accountGuidedNotNull.union(deviceGuidedNotNull)

    // 保存方式1：  使用partitionBy方法来实现动态分区插入
    // res.write.partitionBy("dt").saveAsTable("dwd.mall_app_evt_dtl")


    // 保存方式2： 使用sql的insert关键字
    res.createTempView("res")
    spark.sql(
      s"""
        |
        |insert into dwd.mall_app_evt_dtl partition(dt='${dt}')
        |select  * from res
        |
        |""".stripMargin)


    spark.close()
  }
}
